<?php

/**
 * redis 实现消息队列
 * Class RedisQueue
 */
class RedisQueue
{
    public static $traceCode = ''; //处理某个消息时的日志跟踪码

    /**
     * 往队列塞消息/不存在就创建 (样例)
     * @param string $key
     * @param string|json|array $value
     * @return int
     */
    public static function pushQueue($key, $value)
    {
        if (is_array($value)) {
            $value = json_encode($value);
        }
        return IRedis::getInstance()->lPush($key, $value);
    }

    //删除队列
    public static function delQueue($key)
    {
        return IRedis::getInstance()->del($key);
    }

    /**
     * 退出pop进程
     * @param string $key 名字不能为空
     * @return bool
     */
    public static function killPopQueue($key)
    {
        if (empty($key)) {
            return false;
        }
        $key = escapeshellcmd($key);
        $command = "ps -ef | grep php | grep blockpop | grep $key | awk '{print $2}' | xargs kill ";
        exec($command, $output);
        print_r($output);
    }

    /**
     * 重启pop进程, 样例
     */
    public static function restartQueue($key, $holdTime=0)
    {
        self::killPopQueue($key);
        self::blockPopQueue($key, $holdTime);
        return true;
    }

    /**
     * 阻塞出队列
     * @param array|string $queueKey 队列名
     * @param int $holdTime 阻塞时间
     */
    public static function blockPopQueue($queueKey, $holdTime=0)
    {
        while(true) {
            list($keyName, $jsonValue) = IRedis::getInstance()->brPop($queueKey, $holdTime);
            if (!empty($jsonValue)) {
                self::handle($keyName, $jsonValue);
            }
        }
    }

    //分发消息
    public static function handle($queueKey, $message)
    {
        FileLog::$uuid = Fun::randChar('10');
        try {
            FileLog::info("开始处理消息: $queueKey $message");
            $msgBody = json_decode($message, true);
            $notifyUrls = RedisConfig::getTopicNotifyUrl($queueKey);
            foreach ($notifyUrls as $url) {
                FileLog::info(' 分发消息开始: ' . $url, 'db_queue');
                ICurl::ini($url)
                    ->setConnectTimeOutMs(100)
                    ->setExecTimeOutMs(100)
                    ->setPostData($msgBody)
                    ->post();
                FileLog::info('db_queue 分发消息结束: ' . $url, 'db_queue');
            }
            FileLog::info("消息处理结束: $queueKey $message", 'db_queue');
        } catch (Exception $e) {
            FileLog::error($e->getMessage(), 'db_queue');
        }
    }
}
